# -*- coding: utf-8 -*-
# @Time        : 2023/9/21 17:06
# @Author      : bai.<byscut2010@foxmail.com>
# @File        : stop_task.py
# @Description :
import celery
from celery.app.control import Control
from celery.result import AsyncResult


def stop_celery_task(task_id: str, app=celery):
    task_status = AsyncResult(id=str(task_id), app=app).state.upper()
    print(f"Task {task_id} status: {task_status}")
    celery_control = Control(app=app)
    if task_status in ['SUCCESS', 'FAILURE']:
        return
    celery_control.revoke(task_id, terminate=True, signal='SIGKILL')
    print(f"Task {task_id} has been revoked.")
